package org.dragonnova.business.message.mq.kafka;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import kafka.common.OffsetMetadata;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.dragonnova.business.common.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import com.base.pub.util.ObjectUtils;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.TreeMultimap;

public class KafkaMessageListener {

	private final static Logger LOGGER = LoggerFactory
			.getLogger(KafkaMessageListener.class);
	// private volatile boolean isRunning = false;
	private static int STOP_WAIT_TIME = 3000;
	private Map<String, ListenableFuture<?>> listenerConsumerFutureMaps;
	private ConsumerFactory consumerFactory;
	private Map<String, Object> configs;
	private Collection<String> topics;
	private List<Listener> listener;
	private Trigger trigger;
	private SimpleAsyncTaskExecutor consumerExecutor;

	public KafkaMessageListener() {
		this.listener = new ArrayList<Listener>();
		this.consumerFactory = new ConsumerFactory();
		this.trigger = new TriggerTask();
		this.listenerConsumerFutureMaps = Maps.newConcurrentMap();
	}

	/**
	 * @return get topics
	 */

	public Collection<String> getTopics() {
		return topics;
	}

	/**
	 * @param topics
	 *            set topics
	 */

	public void setTopics(Collection<String> topics) {
		this.topics = topics;
	}

	/**
	 * @param configs
	 *            set configs
	 */

	public void setConfigs(Map<String, Object> configs) {
		this.configs = configs;
	}

	// public boolean isRunning() {
	// return this.isRunning;
	// }

	public void setTrigger(Boolean isTrigger) {
		this.trigger.setTrigger(isTrigger);
	}

	/**
	 * 
	 * @description: 启动用户监听消息
	 * @author songxy DateTime 2017年6月6日 下午4:44:04
	 * @param userIdString
	 *            用户id
	 */
	public void start(String userIdString) {

		if (listenerConsumerFutureMaps.containsKey(userIdString)
				&& (!listenerConsumerFutureMaps.get(userIdString).isDone() || !listenerConsumerFutureMaps
						.get(userIdString).isCancelled())) {
			return;
		}

		if (consumerExecutor == null) {
			consumerExecutor = new SimpleAsyncTaskExecutor(getName()
					+ "-KafkMessage");
		}

		if (getConfigs() != null) {
			this.configs.put(Constants.KAFKA_MESSAGE_GROUP_ID_KEY, userIdString
					+ Constants.KAFKA_MESSAGE_GROUP_ID);
		}

		if (getTopics() != null && getTopics().size() > 0) {
			ListenableFuture listenableFuture = consumerExecutor
					.submitListenable(new ConsumerRunnable(getTopics()));
			listenerConsumerFutureMaps.put(userIdString, listenableFuture);
		}
	}

	/**
	 * 
	 * @description: 消息监听
	 * @author songxy DateTime 2017年5月27日 上午11:40:09
	 * @param listener
	 */
	public void addListener(Listener listener) {
		this.listener.add(listener);
	}

	/**
	 * 
	 * @description: 取消监听
	 * @author songxy DateTime 2017年6月1日 下午2:02:08
	 * @param listener
	 * @return
	 */
	public boolean removeListener(Listener listener) {
		return this.listener.remove(listener);
	}

	private Object getName() {
		return KafkaMessageListener.class.getName();
	}

	public void stop() {
		final CountDownLatch latch = new CountDownLatch(1);
		stop(new Runnable() {

			@Override
			public void run() {
				latch.countDown();
			}
		});
		try {
			latch.await(STOP_WAIT_TIME, TimeUnit.MILLISECONDS);
		} catch (InterruptedException e) {
		}

		if (LOGGER.isDebugEnabled()) {
			LOGGER.debug("KafkaMessageListener stopped.");
		}
	}

	private void stop(final Runnable run) {
		for (ListenableFuture future : this.listenerConsumerFutureMaps.values()) {
			future.addCallback(new ListenableFutureCallback<Object>() {

				@Override
				public void onSuccess(Object result) {
					run.run();
				}

				@Override
				public void onFailure(Throwable ex) {
					run.run();
				}

			});
			future.cancel(true);
		}
	}

	public void destroy() throws Exception {
		stop();
	}

	/**
	 * @return get config
	 */

	public Map<String, Object> getConfigs() {
		return configs;
	}

	public Collection<String> geTopics() {
		return this.topics;
	}

	/**
	 * 
	 * @description: 单条记录
	 * @author songxy DateTime 2017年5月27日 下午1:51:20
	 * @company winter
	 * @email thinkdata@163.com
	 * @version 1.0
	 */
	public interface SimpleListener<T, K> extends Listener {

		boolean onMessage(String topic, T data, K timestamp);

		void onMessageCount(String topic, long count, long timestamp);

		long getLastConsumerTimestamp(String topic);

	}

	/**
	 * 
	 * @description: 多条记录
	 * @author songxy DateTime 2017年5月27日 下午2:57:16
	 * @company winter
	 * @email thinkdata@163.com
	 * @version 1.0
	 */
	public interface BatchListener extends Listener {
		@Override
		Set<String> getTopics();

		boolean onMessage(String topic, Collection<Object> data,
				Long[] timestamp);
	}

	/**
	 * 
	 * @description: 按topic业务处理，非通用
	 * @author songxy DateTime 2017年5月27日 下午2:16:36
	 * @company winter
	 * @email thinkdata@163.com
	 * @version 1.0
	 */
	@SuppressWarnings("rawtypes")
	private class ConsumerRunnable implements SchedulingAwareRunnable {

		private final Consumer consumer;
		private volatile Collection<String> topic;
		private final boolean autoCommit = consumerFactory.isAutoCommit();
		private volatile Map<TopicPartition, OffsetMetadata> clusterPartitions;
		private Boolean batch = new Boolean(false);
		private AtomicLong recordCount = new AtomicLong();
		private volatile boolean isRunning = false;

		public ConsumerRunnable(Collection<String> topic) {
			this.topic = topic;
			this.consumer = consumerFactory.createConsumer();
			this.clusterPartitions = new HashMap<>();

			this.consumer.subscribe(this.topic,
					createConsumerRebalanceListener(consumer));
		}

		private void createPartition() {
			// List<PartitionInfo> partitionInfos = this.consumer
			// .partitionsFor(topic);
			// for (PartitionInfo topicPartition : partitionInfos) {
			// this.clusterPartitions.put(
			// new TopicPartition(topicPartition.topic(),
			// topicPartition.partition()), null);
			// }
			// this.consumer.assign(this.clusterPartitions.keySet());
		}

		@Override
		public void run() {

			setRunning(true);

			while (isRunning()) {

				try {
					ConsumerRecords records = consumer.poll(300);
					if (records != null && records.count() > 0) {
						invokeBatchListener(records);
					}

					Thread.sleep(100);
				} catch (InterruptedException e) {
					LOGGER.error("consumer get message error. "
							+ ObjectUtils.stackTraceToString(e));
				} catch (WakeupException e) {
					LOGGER.error("consumer get message error. "
							+ ObjectUtils.stackTraceToString(e));
				} catch (Exception e) {
					LOGGER.error("consumer get message error. "
							+ ObjectUtils.stackTraceToString(e));
				}
			}

			commitManualAcks();
			consumer.unsubscribe();

			try {
				consumer.close();
			} catch (Exception e) {
				LOGGER.error("consumer close error. " + e.getMessage());
			}

			setRunning(false);
		}

		private void setRunning(boolean status) {
			isRunning = status;
		}

		private boolean isRunning() {
			return isRunning;
		}

		@SuppressWarnings({ "unchecked", "rawtypes" })
		private void invokeBatchListener(final ConsumerRecords records) {
			if (this.isBatch()) {
				processBatchListener(records);
			} else {
				processSingleTopicRecord(records);
			}
		}

		/**
		 * 
		 * @description: 单条处理消息
		 * @author songxy DateTime 2017年6月2日 上午10:47:04
		 * @param records
		 */
		private void processSingleTopicRecord(ConsumerRecords records) {

			notifyCountEvent(records);

			notifyListener(records);
		}

		private TopicPartition getLastPartition(ConsumerRecords records) {
			TopicPartition partition = null;
			Set<TopicPartition> partitions = records.partitions();
			long lastTs = 0;
			for (TopicPartition temp : partitions) {
				List<ConsumerRecord> partitionRecords = records.records(temp);
				ConsumerRecord oldRecord = partitionRecords
						.get(partitionRecords.size() - 1);
				if (oldRecord.timestamp() != ConsumerRecord.NO_TIMESTAMP) {
					if (Long.compare(oldRecord.timestamp(), lastTs) > 0) {
						partition = temp;
					}
				}
			}
			return partition;
		}

		/**
		 * 
		 * @description:派发消息
		 * @author songxy DateTime 2017年5月27日 下午1:40:45
		 * @param consumerRecord
		 */
		private boolean notifyListener(ConsumerRecords consumerRecord) {
			Set<TopicPartition> topicPartitions = consumerRecord.partitions();

			for (TopicPartition tp : topicPartitions) {
				for (Listener l : KafkaMessageListener.this.listener) {
					// Set<String> top = ObjectUtils.stringArrayIntersect(
					// l.getTopics(), this.topic);
					Iterable iterable = consumerRecord.records(tp.topic());
					if (l.getTopics().contains(tp.topic())) {
						if (l instanceof SimpleListener) {
							SimpleListener l1 = (SimpleListener) l;
							for (Iterator iter = iterable.iterator(); iter
									.hasNext();) {
								ConsumerRecord record = (ConsumerRecord) iter
										.next();
								// 有消费提交
								@SuppressWarnings("unchecked")
								boolean isConsumer = l1.onMessage(tp.topic(),
										record.value(), record.timestamp());

								if (isConsumer) {
									commitManualAcks(Collections.singletonMap(
											tp,
											new OffsetAndMetadata(record
													.offset() + 1)));
								}
							}
						} else if (l instanceof BatchListener) {
							BatchListener l1 = (BatchListener) l;
							boolean isConsumer = l1
									.onMessage(tp.topic(), getValue(iterable),
											getTimestamps(iterable));
							if (isConsumer) {
								commitManualAcks(tp);
							}
						}
					}
				}
			}
			return true;
		}

		/**
		 * 
		 * @description:派发消息统计
		 * @author songxy DateTime 2017年5月27日 下午1:40:45
		 * @param consumerRecord
		 */
		private void notifyCountEvent(ConsumerRecords consumerRecords) {
			// for (Listener l : KafkaMessageListener.this.listener) {
			// l.onMessageCount(Constants.TOPIC_PAY, consumerRecords.count(),
			// 0);
			// }
			Multimap<String, ConsumerRecord> topicCounts = TreeMultimap
					.<String, ConsumerRecord> create(new Comparator<String>() {

						@Override
						public int compare(String o1, String o2) {
							return o1.compareTo(o2);
						}

					}, new Comparator<ConsumerRecord>() {

						@Override
						public int compare(ConsumerRecord o1, ConsumerRecord o2) {
							if (o1.topic().compareTo(o2.topic()) == 0) {
								return Long.compare(o1.timestamp(),
										o2.timestamp());
							}
							return o1.topic().compareTo(o2.topic());
						}
					});
			for (Iterator iter = consumerRecords.iterator(); iter.hasNext();) {
				ConsumerRecord consumerRecord = (ConsumerRecord) iter.next();
				topicCounts.put(consumerRecord.topic(), consumerRecord);
			}
			for (String topicString : topicCounts.keySet()) {
				for (Listener l : KafkaMessageListener.this.listener) {
					if (l instanceof SimpleListener) {
						SimpleListener l1 = (SimpleListener) l;
						int count = getNewestRecordSize(
								topicCounts.get(topicString),
								l1.getLastConsumerTimestamp(topicString));
						long lastTs = getLastConsumerTs(topicCounts
								.get(topicString));
						// 比较时间
						if (count > 0) {
							l1.onMessageCount(topicString,
									topicCounts.get(topicString).size(), lastTs);
						}
					}
				}
			}
		}

		/**
		 * 
		 * @description: 获取最新记录
		 * @author songxy DateTime 2017年6月2日 上午10:46:51
		 * @param records
		 */
		private int getNewestRecordSize(Collection<ConsumerRecord> records,
				long lastTs) {
			Collection<ConsumerRecord> records2 = records;
			if (lastTs == 0) {
				return records2.size();
			}

			long temp = lastTs;
			int i = 0;
			for (Iterator<ConsumerRecord> iter = records2.iterator(); iter
					.hasNext();) {
				if (temp <= iter.next().timestamp()) {
					i++;
				}
			}
			return i;
		}

		private long getLastConsumerTs(Collection<ConsumerRecord> records) {
			Collection<ConsumerRecord> records2 = records;
			List<ConsumerRecord> temp = Lists.newArrayList();
			temp.addAll(records2);
			return temp.get(temp.size() - 1).timestamp();
		}

		private void processBatchListener(ConsumerRecords records) {

			// recordCount.addAndGet(records.count());
			// for (Listener l : listener) {
			// if (l instanceof BatchListener) {
			// ((BatchListener) l).onMessageCount(recordCount.get());
			// if (KafkaMessageListener.this.trigger.isFired()) {
			// ((BatchListener) l).onMessages(topic,
			// getValue(records), getTimestamps(records));
			// }
			// }
			// }
		}

		private Long[] getTimestamps(Iterable dataIterable) {
			Iterator<ConsumerRecord> iter = dataIterable.iterator();
			List<Long> data = new ArrayList<Long>();
			while (iter.hasNext()) {
				data.add(iter.next().timestamp());
			}
			return data.toArray(new Long[data.size()]);
		}

		private Collection<Object> getValue(Iterable dataIterable) {

			Iterator<Object> iter = dataIterable.iterator();
			List<Object> data = new ArrayList<Object>();
			while (iter.hasNext()) {
				data.add(((ConsumerRecord) iter.next()).value());
			}
			return data;
		}

		public ConsumerRebalanceListener createConsumerRebalanceListener(
				final Consumer consumer) {
			return new ConsumerRebalanceListener() {

				@Override
				public void onPartitionsRevoked(
						Collection<TopicPartition> partitions) {
				}

				@Override
				public void onPartitionsAssigned(
						Collection<TopicPartition> partitions) {
				}

			};
		}

		private void commitManualAcks(TopicPartition tp) {
			try {
				consumer.committed(tp);
			} catch (WakeupException e) {
			}
		}

		private void commitManualAcks() {
			try {
				consumer.commitAsync();
			} catch (WakeupException e) {
			}
		}

		@SuppressWarnings("unchecked")
		private void commitManualAcks(
				Map<TopicPartition, OffsetAndMetadata> offsets) {
			consumer.commitAsync(offsets, new OffsetCommitCallback() {

				@Override
				public void onComplete(
						Map<TopicPartition, OffsetAndMetadata> offsets,
						Exception exception) {
					if (exception == null) {
						if (LOGGER.isDebugEnabled()) {
							LOGGER.debug("consumer commit completed. ");
						}
					} else {
						if (LOGGER.isDebugEnabled()) {
							LOGGER.debug("consumer commit error. "
									+ ObjectUtils.stackTraceToString(exception));
						}
					}
				}
			});
		}

		@Override
		public boolean isLongLived() {
			return true;
		}

		/**
		 * @return get batch
		 */

		public Boolean isBatch() {
			return batch;
		}

		/**
		 * @param batch
		 *            set batch
		 */

		public void setBatch(Boolean batch) {
			this.batch = batch;
		}

	}

	private class TriggerTask implements Trigger {
		private Boolean isTrigger = new Boolean(false);

		@Override
		public void setTrigger(boolean isTrigger) {
			this.isTrigger = isTrigger;
		}

		@Override
		public Boolean isFired() {
			return this.isTrigger;
		}
	}

	private interface Trigger {

		void setTrigger(boolean isTrigger);

		Boolean isFired();

	}

	/**
	 * 
	 * @description: 消费工厂
	 * @author songxy DateTime 2017年5月27日 下午2:25:20
	 * @company winter
	 * @email thinkdata@163.com
	 * @version 1.0
	 * @param <K>
	 * @param <V>
	 */
	private class ConsumerFactory<K, V> {

		public Boolean isAutoCommit() {
			Boolean auto;
			try {
				auto = (Boolean) getConfigs().get(
						ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
				return auto;
			} catch (ClassCastException e) {
				return false;
			}
		}

		private Consumer<K, V> createConsumer() {
			return new KafkaConsumer<K, V>(getConfigs());
		}

	}

	/**
	 * 
	 * @description: 消费监听模式
	 * @author songxy DateTime 2017年5月27日 下午2:25:04
	 * @company winter
	 * @email thinkdata@163.com
	 * @version 1.0
	 */
	public enum ConsumerAckMode {
		RECORD, BATCH, TIME, COUNT, MANUAL,
	}

}
